-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat: implement SEP-1699 SSE polling via server-side disconnect #1129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
commit: |
ebd4179 to
0b61e98
Compare
src/server/streamableHttp.test.ts
Outdated
| toolResolve!(); | ||
| }); | ||
|
|
||
| it('should support POST SSE polling with client reconnection', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably the most interesting test
src/server/streamableHttp.test.ts
Outdated
| const primingEventId = primingIdMatch![1]; | ||
|
|
||
| // Server closes the stream to trigger polling | ||
| transport.closeSSEStream(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels slightly weird. The whole point is for the server to be able to close SSE streams, so maybe this method should be exposed on server?
On the other hand, this disconnection only makes sense in the SHTTP transport - whereas the server is actually transport agnostic.
Maybe the correct expectation is to have the server call this method directly via its reference to the transport if necessary and actually available, so this might be fine actually.`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be called by the user on the server. A bunch of frameworks dont give the user easy access to the transport but they do the server. Proxy by reference is good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me it would feel very weird to have a closeSSEStream on the server that should have no knowledge of sessions let alone streams.
A bunch of frameworks dont give the user easy access to the transport but they do the server. Proxy by reference is good
This is a good point but maybe the right API is to expose the transport from the server?
a556d78 to
6368bfa
Compare
Add support for SSE retry field to enable server-controlled client reconnection timing. Client changes: - Capture server-provided retry field from SSE events - Use retry value for reconnection delay instead of exponential backoff - Reconnect on graceful stream close with Last-Event-ID header Server changes: - Add retryInterval option to StreamableHTTPServerTransportOptions - Send priming events with id/retry/empty-data when eventStore is configured - Add closeSSEStream(requestId) API to close POST SSE streams for polling - Priming events establish resumption capability before actual messages Tests: - Client: retry field capture, exponential backoff fallback, graceful reconnection - Server: priming events, retry field, stream closure, POST SSE polling flow
6368bfa to
464b1f8
Compare
|
Hi @jonathanhefner would love feedback on whether this accurately captures the intent of your original SEP @paoloricciuti in case you want to TAL as you had valuable input on modelcontextprotocol/conformance#35 |
paoloricciuti
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we actually had a brief discussion about this on discord and I think the spec should be clearer here: from the discussion we had the reconnection should be different from the standalone stream. So if the client was doing two POST requests each POST would have it's own lastEventId.
On reconnect this should basically result in 3 new GET SSE streams, two to get the remaining notificiations/responses from the POST requests and one as a standalone stream for the new notifications (and to resume the notifications eventually sent during the disconnection period).
However right now the server still errors out if there's more that one SSE stream. Should this be fixed? Is this even the right interpretation of the spec?
src/server/streamableHttp.test.ts
Outdated
| const primingEventId = primingIdMatch![1]; | ||
|
|
||
| // Server closes the stream to trigger polling | ||
| transport.closeSSEStream(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me it would feel very weird to have a closeSSEStream on the server that should have no knowledge of sessions let alone streams.
A bunch of frameworks dont give the user easy access to the transport but they do the server. Proxy by reference is good
This is a good point but maybe the right API is to expose the transport from the server?
jonathanhefner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would love feedback on whether this accurately captures the intent of your original SEP
Yes, looks great! Thank you! 😃
src/server/streamableHttp.test.ts
Outdated
| toolResolve!(); | ||
|
|
||
| // Give the tool time to complete and store the result | ||
| await new Promise(resolve => setTimeout(resolve, 50)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is a concern, but this could lead to flaky tests. I'm not sure what we could await as an alternative though.
Addresses PR feedback from paoloricciuti requesting test coverage for the scenario where multiple messages are sent while the SSE client is disconnected. Uses a batch of tool calls to generate multiple responses that get stored and replayed on reconnection.
Ack, after the discusson on Discord seems clear we need to not error out on multiple streams. Still figuring out the right way to do that on this PR, will come back with an update soon, moving to draft for now. |
- Fix replayEvents to use streamId from last-event-id header - Add conflict check per streamId (not global) - Add missing close handler to clean up stream mapping - Add test demonstrating concurrent GET streams resuming different POST streams This aligns with the spec: "The client MAY remain connected to multiple SSE streams simultaneously."
151614e to
6d32c15
Compare
|
Horrible timeout based tests - but looking for feedback on the implementation before figuring out prettier tests. |
- Add closeStandaloneSSEStream() method to allow server to close the standalone GET notification stream, triggering client reconnection - Send priming event with retry field on GET streams (when eventStore configured) for resumability - Add tests for GET stream priming events and closeStandaloneSSEStream - Fix flaky test timeout for POST SSE polling test
d9761b6 to
8cbb07e
Compare
I think there should not be any changes required - this is primarily about server behavior, the SDK should handle resumption automatically. If we wanted to have some debugging insight into resumption / reconnection though that seems like it might be valuable? E.g. surfacing reconnection events, showing the polling, showing |
Would we need a demo server to generate some of this behavior, or is there one in the SDK examples we could use to test with? |
Good point, let me see if I can make an exampleServer... |
Per SEP-1699, clients should auto-reconnect via GET when server closes POST SSE streams mid-operation. This enables polling for long-running tool calls. Changes: - Enable isReconnectable=true for POST SSE streams in client - Add example client demonstrating SSE polling with server - Update test to expect GET reconnection after POST stream fails
The isReconnectable=true change was too aggressive - per SEP-1699, reconnection should only happen after server sends a priming event with an event ID, not on all POST stream failures. Keep the example client for now; proper reconnection logic TBD.
|
@cliffhall added
|
Per SEP-1699, servers may close SSE streams after sending a priming event with an event ID. This change enables automatic reconnection for POST-initiated streams once they've received at least one event with an ID. The change introduces `hasPrimingEvent` tracking in `_handleSseStream`: - GET streams remain always reconnectable (isReconnectable=true) - POST streams become reconnectable after receiving an event with ID - Reconnection uses `canResume = isReconnectable || hasPrimingEvent` This enables the SSE polling example client to work correctly with server-initiated disconnection and automatic client reconnection.
|
|
||
| // Resume the notification stream using lastEventId | ||
| // This is the key part - we're resuming the same long-running tool using lastEventId | ||
| await client2.request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was actually not spec compliant - we were doing a second post request to continue.
The spec actually requires all resumption requests to be GET requests.
|
Sorry I was at my company off-site and couldn't review it properly, will take a look tomorrow |
| * @param lastEventId The ID of the last received event for resumability | ||
| * @param attemptCount Current reconnection attempt count for this specific stream | ||
| */ | ||
| private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not for this PR but it would be good to be able to overwrite this method. For clients which aren't running all the time this will need some input externally from the client itself to wake it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, made an issue for now #1162
src/server/streamableHttp.ts
Outdated
| this._streamMapping.set(streamId, res); | ||
|
|
||
| // Use streamId from getStreamIdForEventId if available, otherwise from replay | ||
| const finalStreamId = streamId ?? replayedStreamId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you explain this logic for me? I'm not sure I understand why the choice in ids. Wouldnt we only do this is the replayed one is now closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If getStreamIdForEventId is implemented: We use streamId from that lookup. This is the "proper" way - the event store explicitly tells us which stream the event belongs to, and we've already done conflict checking with this ID at line 403. We need this to be able to identify the correct stream encoded in the ID.
If getStreamIdForEventId isn't implemented (for backwards compat): streamId will be undefined, so we fall back to replayedStreamId returned from replayEventsAfter. This allows older event store implementations that don't implement the optional method to still work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, thinking through this again - I think I see what you're saying, we don't actually need streamId ?? replayedStreamId because there isn't really a case where streamId != replayedStreamId if getStreamIdForEventId is implemented correctly.
So we could probably simplify the below to just directly reference replayedStreamId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed a commit to update this, lmk what you think.
mattzcarey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tentative approval
- Fix _scheduleReconnection comment: delay can now come from server retry field, not just exponential backoff - Fix getStreamIdForEventId comment: SDK doesn't parse streamId::... format, it uses replayEventsAfter return value as fallback
Always use replayedStreamId from replayEventsAfter instead of preferring streamId from getStreamIdForEventId. Both should return the same value for a correctly implemented event store, and replayEventsAfter is the authoritative source since it's the required method.
findleyr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi! Just a few comments as I compared this to what I was planning for the Go SDK. Feel free to disregard, though I'm curious of your thoughts in any case.
|
|
||
| // Server decides to disconnect the client to free resources | ||
| // Client will reconnect via GET with Last-Event-ID after retryInterval | ||
| const transport = transports.get(extra.sessionId!); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the Go SDK, I was considering adding a (possibly nil) callback on extra, to avoid this type of coupling between tool and transport. Would that be valuable here?
| * Close an SSE stream for a specific request, triggering client reconnection. | ||
| * Use this to implement polling behavior during long-running operations - | ||
| * client will reconnect after the retry interval specified in the priming event. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should this operation only be valid if there's an event store?
I don't actually know, but it seems a bit problematic that the tool has to be aware both that it's being served on a streamable transport, and that this transport has an event store.
If the intent is to implement this in the actual tool, then it might be better to pass a callback in the request extra.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should closeSSEStream take a retry duration? Again, if we're delegating control over this to the tool, one can imagine that the tool would want to be able to control its own replay pacing, and that the retry timeout for one tool might not be appropriate for another.

Summary
Implements SEP-1699 which enables servers to disconnect SSE connections at will by sending priming events and retry fields.
Motivation and Context
SEP-1699 introduces SSE polling behavior that allows servers to control client reconnection timing and close connections gracefully. This enables more efficient resource management on the server side while maintaining resumability.
We implement this on the
POSTSSE stream as implied by the SEP language linked above. I.e. when a server establishes an SSE stream:cancelSSEStreamto close the stream while still gathering the events.retryIntervalsupplied by the server before disconnection.How Has This Been Tested?
Breaking Changes
None. Client falls back to exponential backoff if no retry field is provided.
Types of changes
Checklist